Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add semaphore to wait for streams to become available. #38

Closed
wants to merge 3 commits into from
Closed

[WIP] Add semaphore to wait for streams to become available. #38

wants to merge 3 commits into from

Conversation

ndhansen
Copy link

@ndhansen ndhansen commented Mar 3, 2020

This PR adds a semaphore lock to prevent exceptions when the MAX_CONCURRENT_STREAMS limit is reached. It now instead waits for a stream to close instead of raising an exception.

This bug was initially reported in httpx.

Some questions / comments:

What timeout should be used (if any)?

Both AsyncSemaphore and SyncSemaphore allow a timeout to be passed when acquiring. Should my code accept any sort of timeout?

Should semaphores support with statements?

The abstract semaphore interface doesn't implement any entry or exit functions, preventing the use of the with statement. I presume this is because this would ignore the timeout condition, and could lead to programming errors in the future.

Here are some possible ideas:

  1. We don't add any entry or exit functions. This prevents using semaphores without timeouts when timeouts might be appropriate.
  2. We add entry and exit functions that call acquire without timeouts.
  3. We add the ability to pass a timeout when creating the semaphore, and use that timeout as the default when using entry and exit functions.

What exception should be thrown if a semaphore timeout is reached?

I'm currently using PoolTimeout, but it's a noop becuase without a timeout it's never going to be called anyway. Should I just pass None?

else:
self.state = ConnectionState.ACTIVE
finally:
self.streams_semaphore.release()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so I think the wrapping here isn't quite correct. It'd make sense to instead wrap up the portion of this method after the connection init, so...

# Wrap this up inside the stream semaphore
h2_stream = AsyncHTTP2Stream(stream_id=stream_id, connection=self)
self.streams[stream_id] = h2_stream
self.events[stream_id] = []
return await h2_stream.request(method, url, headers, stream, timeout)

Since that'd ensure that we hold the semaphore for the duration of a request/response.

In fact, it's actually a bit more awkward that tho. We don't want to release the semaphore until the client has closed the response we've returned. So really we want to release the semaphore either on an exception occuring within this block, or on the response being closed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, something like...

async def request(...):
    ...
    await self.streams_semaphore.acquire()
    try:
        h2_stream = AsyncHTTP2Stream(stream_id=stream_id, connection=self)
        self.streams[stream_id] = h2_stream
        self.events[stream_id] = []
        return await h2_stream.request(method, url, headers, stream, timeout)
    except:
        self.streams_semaphore.release()
        raise

...
async def close_stream(self, stream_id: int) -> None:
    self.streams_semaphore.release()
    ...

Alternately we could pass the semaphore to AsyncHTTP2Stream, and wrap up the behaviour there instead. (That approach might feel a bit more tightly constrained)

@tomchristie
Copy link
Member

tomchristie commented May 13, 2020

Also, answers to some of the questions here...

  • Semaphores shouldn't support with, simply because we won't ever use it. That's because it's a fiddly case of "acquire a semaphore, and either close it on response close, or if an exception occurs before returning the response".
  • We should use the pool acquiry timeout value, and the PoolTimeout exception class.

@tomchristie
Copy link
Member

Closing this now in favour of #89 - thanks so much for your initial time on it - it's been really helpful!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants